import pandas as pd
import plotly.express as px
import plotly.io as pio
pio.renderers.default = "svg"
from pyspark.sql import SparkSession
import re
import numpy as np
import plotly.graph_objects as go
from pyspark.sql.functions import col, split, explode, regexp_replace, transform, when
from pyspark.sql import functions as F
from pyspark.sql.functions import col, monotonically_increasing_id
# Set random seed
np.random.seed(42)
# Change Plotly renderer for notebooks
pio.renderers.default = "notebook"Assignment 03
1 Import Packages
2 Plotly Templete
pio.templates["nike"] = go.layout.Template(
# LAYOUT
layout = {
# Fonts
# Note - 'family' must be a single string, NOT a list or dict!
'title':
{'font': {'family': 'HelveticaNeue-CondensedBold, Helvetica, Sans-serif',
'size':30,
'color': '#333'}
},
'font': {'family': 'Helvetica Neue, Helvetica, Sans-serif',
'size':16,
'color': '#333'},
# Colorways
'colorway': ['#ec7424', '#a4abab'],
# Keep adding others as needed below
'hovermode': 'x unified'
},
# DATA
data = {
# Each graph object must be in a tuple or list for each trace
'bar': [go.Bar(texttemplate = '%{value:$.2s}',
textposition='outside',
textfont={'family': 'Helvetica Neue, Helvetica, Sans-serif',
'size': 20,
'color': '#FFFFFF'
})]
}
)3 Load Dataset
# Initialize Spark Session
spark = SparkSession.builder.appName("LightcastData").getOrCreate()
# Load Data
df = spark.read.option("header", "true").option("inferSchema", "true").option("multiLine","true").option("escape", "\"").csv("/home/ubuntu/assignment-03-Sabrina1211/data/lightcast_job_postings.csv")
# Show Schema and Sample Data
#print("---This is Diagnostic check, No need to print it in the final doc---")
#df.printSchema() # comment this line when rendering the submission
#df.show(5)Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/09/21 01:04:17 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
[Stage 1:> (0 + 1) / 1]
4 Data Preparation
# Step 1: Casting Salary and experience columns
df = df.withColumn("SALARY", col("SALARY").cast("float")) \
.withColumn("SALARY_FROM", col("SALARY_FROM").cast("float")) \
.withColumn("SALARY_TO", col("SALARY_TO").cast("float")) \
.withColumn("MIN_YEARS_EXPERIENCE", col("MIN_YEARS_EXPERIENCE").cast("float")) \
.withColumn("MAX_YEARS_EXPERIENCE", col("MAX_YEARS_EXPERIENCE").cast("float"))
# Step 2: Computing medians for salary columns
def compute_median(sdf, col_name):
q = sdf.approxQuantile(col_name, [0.5], 0.01)
return q[0] if q else None
median_from = compute_median(df, "SALARY_FROM")
median_to = compute_median(df, "SALARY_TO")
median_salary = compute_median(df, "SALARY")
print("Medians:", median_from, median_to, median_salary)
# Step 4: Imputing missing salaries, but no experience
df = df.fillna({
"SALARY_FROM": median_from,
"SALARY_TO": median_to,
})
# Step 5: Computing Average Salary
df = df.withColumn("Average_Salary", (col("SALARY_FROM") + col("SALARY_TO")) /2)
# Step 6: Selecting required columns
export_cols = [
"EDUCATION_LEVELS_NAME",
"REMOTE_TYPE_NAME",
"MAX_YEARS_EXPERIENCE",
"Average_Salary",
"SALARY",
"LOT_V6_SPECIALIZED_OCCUPATION_NAME"
]
df_selected = df.select(*export_cols)
# Step 7: Saving to csv
pdf = df_selected.toPandas()
pdf.to_csv("./data/lightcast_cleaned.csv", index=False)
print("Data cleaning complete. Rows retained:", len(pdf))[Stage 2:> (0 + 1) / 1] [Stage 3:> (0 + 1) / 1] [Stage 4:> (0 + 1) / 1]
Medians: 87295.0 130042.0 115024.0
[Stage 5:> (0 + 1) / 1]
Data cleaning complete. Rows retained: 72498
5 Salary Distribution Employment Type
# Filter out missing or zero salary values
pdf = (
df.select("EMPLOYMENT_TYPE_NAME", col("SALARY").cast("double").alias("SALARY"))
.filter( (col("SALARY").isNotNull()) & (col("SALARY") > 0) )
.toPandas()
)
# Clean labels
pdf["EMPLOYMENT_TYPE_NAME"] = (
pdf["EMPLOYMENT_TYPE_NAME"].astype("string").fillna("Unknown")
.str.replace(r"[^\x00-\x7F]+", "", regex=True).str.strip()
)
# Sort by median salary
sorted_employment_types = (
pdf.groupby("EMPLOYMENT_TYPE_NAME")["SALARY"].median()
.sort_values(ascending=False).index
)
pdf["EMPLOYMENT_TYPE_NAME"] = pd.Categorical(pdf["EMPLOYMENT_TYPE_NAME"],
categories=sorted_employment_types,
ordered=True)
# --- box plot: show outliers only ---
fig = px.box(
pdf,
x="EMPLOYMENT_TYPE_NAME",
y="SALARY",
title="Salary Distribution by Employment Type",
color_discrete_sequence=["#EF553B"],
points="outliers" # ← key change
)
fig.update_layout(
xaxis=dict(title="Employment Type", categoryorder="array",
categoryarray=sorted_employment_types.tolist(),
tickfont=dict(size=18)),
yaxis=dict(title="Salary (K $)", range=[0, 500000],
tickvals=[0, 50_000, 100_000, 150_000, 200_000, 250_000, 300_000, 350_000, 400_000, 450_000, 500_000],
ticktext=["0","50K","100K","150K","200K","250K","300K","350K","400K","450K","500K"]),
font=dict(family="Arial", size=16),
plot_bgcolor="white",
paper_bgcolor="white",
showlegend=False,
height=500, width=850
)
fig.show()
fig.write_html("output/DistributionEmploymentType.html")
fig.write_image("output/DistributionEmploymentType.svg", width=850, height=500, scale=1)[Stage 6:> (0 + 1) / 1]
6 Salary Distribution by Industry
pdf = df.select("NAICS2_NAME", "SALARY").toPandas()
fig = px.box(
pdf,
x="NAICS2_NAME",
y="SALARY",
title="Salary Distribution by Industry",
color_discrete_sequence=["#EF553B"]
)
fig.update_layout(template="nike") # change to "plotly_white" if this template isn't available
# rotate x-axis labels for readability
fig.update_xaxes(tickangle=45)
fig.show()
fig.write_html("output/DistributionIndustry.html")
fig.write_image("output/DistributionIndustry.svg", width=850, height=500, scale=1)